# coding: UTF-8

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
import sys

reload(sys)
import xlrd
import os

sys.setdefaultencoding('utf-8')
# conf = SparkConf().set("spark.sql.parquet.compression.codec", "snappy")
# sc = SparkContext("yarn-client", "extraction_tmp", conf=conf)
# sqlContext = HiveContext(sc)
sqlContest = SQLContext(sc)


def extractions_tmp(smslabel, nums,dir):
    alldf = sqlContext.read.format("org.apache.phoenix.spark") \
        .option("table", "SMS.SMSGATEWAY_20171115") \
        .option("zkUrl", "master-01,master-02:2181") \
        .load()
    # df.filter("""tags like '%教育%'""" )
    alldf.registerTempTable("mt_history")
    eachFile("/home/zhengjj/90653918550f4a53b4e2ee259c5e3eca/extraction_mt_log/test/test/")
    df = sqlContext.sql("""
        select
            t3.msisdn,
            t3.city city,
            t3.provider_id
        FROM (
            select
                msisdn msisdn,
                sum(extraction_times) cnt,
                city city,
                provider_id provider_id
            from (
                select
                    t.msisdn msisdn ,
                    0 extraction_times,
                    t.city city,
                    t.provider_id provider_id
                from
                    mt_history t
                where
                    (provider_id =1
                    or (provider_id =2 and substr(msisdn,1,3) = '186'))
                    and SMSLABELS like '%%%(smslabel)s%%' and city not in('北京','上海','深圳市','沈阳市')
                union all
                select
                    t1.msisdn,
                    1 extraction_times,
                    '' city,
                    0 provider_id
                from
                    old_df t1
            ) t2
            group by
                t2.msisdn,
                t2.city,
                t2.provider_id
        ) t3
        where
            t3.cnt = 0
        limit %(nums)d
    """ % {"nums": nums,"smslabel": smslabel.strip()})
    writer = pd.ExcelWriter("""/home/zhengjj/90653918550f4a53b4e2ee259c5e3eca/extraction_mt_log/test/test_result/%(dir)s.xlsx""" % {"dir":dir},
                            engine='xlsxwriter')
    df.toPandas().to_excel(writer, sheet_name='Sheet1')
    writer.save()


def eachFile(filepath):
    pathDir = os.listdir(filepath)
    index = 1
    for allDir in pathDir:
        read_file_df = pd.read_excel(filepath + allDir)
        sqlContest = SQLContext(sc)
        spark_df = sqlContest.createDataFrame(read_file_df)
        spark_df.registerTempTable("spark_df_" + str(index))
        index += 1
        print str(index)
    sql = """select
                cast(msisdn as string) msisdn,
                1  extraction_times
            from
                spark_df_1
    """
    for i in range(2, index):
        sql += """
            union all
            select
                cast(msisdn as string) msisdn,
                1  extraction_times
            from spark_df_%(index)s
        """ % {"index": i}
    print sql
    old_df = sqlContext.sql(sql)
    old_df.registerTempTable("old_df")


if __name__ == '__main__':
extractions_tmp("龙之谷", 5000,"longzhigu_5k")
extractions_tmp("球球大作战", 5000,"xianjianqingyuan_5K")
